Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] buffered channel drain single collect #52

Merged
merged 7 commits into from
Oct 6, 2023

Conversation

pwhelan
Copy link
Contributor

@pwhelan pwhelan commented Oct 4, 2023

This is a minor change for #44 that only ever invokes Collect once. Only once and only ever once! This has been both documented as well as tests added to make sure it remains this way.

This is per @nicolasparada. This assumption might also have been made by @niedbalski. There is at least one plugin I know of which might be affected by this, but it might be that one single plugin.

Copy link
Contributor

@nicolasparada nicolasparada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason I cannot see the full diff even tho this PR is targeting main...
Anyway. Looks good, but I saw a change of behavior here https://github.com/calyptia/plugin/blob/f16bef3495ec8e7b50eb620377ac6867fcceb3f2/cshared.go#L190-L192
Before if the channel gets closed, then it returns with ok, now it returns with error.
I don't mind either one, I just want to be sure what we wan't to do here.

@niedbalski
Copy link
Contributor

For some reason I cannot see the full diff even tho this PR is targeting main... Anyway. Looks good, but I saw a change of behavior here

https://github.com/calyptia/plugin/blob/f16bef3495ec8e7b50eb620377ac6867fcceb3f2/cshared.go#L190-L192

Before if the channel gets closed, then it returns with ok, now it returns with error.
I don't mind either one, I just want to be sure what we wan't to do here.

@nicolasparada do you still see the issue with the latency/delays that devo is reporting?

@nicolasparada
Copy link
Contributor

For some reason I cannot see the full diff even tho this PR is targeting main... Anyway. Looks good, but I saw a change of behavior here
https://github.com/calyptia/plugin/blob/f16bef3495ec8e7b50eb620377ac6867fcceb3f2/cshared.go#L190-L192

Before if the channel gets closed, then it returns with ok, now it returns with error.
I don't mind either one, I just want to be sure what we wan't to do here.

@nicolasparada do you still see the issue with the latency/delays that devo is reporting?

No, getting nanosecond measurements :)

@niedbalski
Copy link
Contributor

@pwhelan can you please resolve the conflicts here.

@pwhelan pwhelan mentioned this pull request Oct 5, 2023
Due to the revert in #44 I was unable to rebase #52 interactively so the
changes will have to be applied in a single patch. This unfortunately
means the loss of git history, but so be it. Here is the changelog:

  * test: add a sleep in the infinite call to give time for collect to be
    accidentally invoked after Done().
  * input: invoke Collect only once and only ever once. Tests have been
    updated to enforce this behavior.
  * test: improve dangling test.
  * test: add latency test.

Signed-off-by: Phillip Whelan <[email protected]>
@pwhelan pwhelan force-pushed the pwhelan-feat-buffered-channel-drain-single-collect branch from f16bef3 to 266c000 Compare October 5, 2023 15:03
@pwhelan
Copy link
Contributor Author

pwhelan commented Oct 5, 2023

@nicolasparada The tests can fail intermittently. I opened an issue on the backlog,https://app.asana.com/0/1205296321537082/1205660623899255/f. I mention it just in case maybe you can see the root cause and to also just run the tests a few times to see they are sane.

@pwhelan
Copy link
Contributor Author

pwhelan commented Oct 5, 2023

@pwhelan can you please resolve the conflicts here.

It is done. I had to reapply with a patch.

@pwhelan
Copy link
Contributor Author

pwhelan commented Oct 5, 2023

@niedbalski I have changed CI to run each test individually... which should be easy enough to revert later when we work on hot-reload.

Here is a log of me running http_loader with the latest version of the plugin from this branch against an http server that serves a simple json file:

fluent-bit -c test.conf
Fluent Bit v2.1.10
* Copyright (C) 2015-2022 The Fluent Bit Authors
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2023/10/05 16:28:06] [ info] Configuration:
[2023/10/05 16:28:06] [ info]  flush time     | 1.000000 seconds
[2023/10/05 16:28:06] [ info]  grace          | 5 seconds
[2023/10/05 16:28:06] [ info]  daemon         | 0
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info]  inputs:
[2023/10/05 16:28:06] [ info]      http_loader
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info]  filters:
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info]  outputs:
[2023/10/05 16:28:06] [ info]      stdout.0
[2023/10/05 16:28:06] [ info] ___________
[2023/10/05 16:28:06] [ info]  collectors:
[2023/10/05 16:28:06] [ info] [fluent bit] version=2.1.10, commit=1a41f49dc2, pid=602128
[2023/10/05 16:28:06] [debug] [engine] coroutine stack size: 24576 bytes (24.0K)
[2023/10/05 16:28:06] [ info] [storage] ver=1.4.0, type=memory, sync=normal, checksum=off, max_chunks_up=128
[2023/10/05 16:28:06] [ info] [cmetrics] version=0.6.3
[2023/10/05 16:28:06] [ info] [ctraces ] version=0.3.1
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] initializing
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] storage_strategy='memory' (memory only)
[2023/10/05 16:28:06] [ info] http_loader: defaulting method to GET
[2023/10/05 16:28:06] [ info] http_loader: defaulting skip to `{{or (ge .Response.StatusCode 400) (empty .Response.Body)}}`
[2023/10/05 16:28:06] [debug] Using storage key "http_loader_v2_da76aa41"
[2023/10/05 16:28:06] [debug] Data loaded from local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:06] [debug] Successfully json unmarshalled loaded data http://localhost:8888/test.json
[2023/10/05 16:28:06] [debug] Failed to json unmarshal stored request body, returning original string ""
[2023/10/05 16:28:06] [debug] Successfully processed loaded data with index index 2
[2023/10/05 16:28:06] [debug] [input:http_loader:http_loader.0] [thread init] initialization OK
[2023/10/05 16:28:06] [ info] [input:http_loader:http_loader.0] thread instance initialized
[2023/10/05 16:28:06] [debug] [http_loader:http_loader.0] created event channels: read=30 write=34
[2023/10/05 16:28:06] [debug] [stdout:stdout.0] created event channels: read=37 write=38
[2023/10/05 16:28:06] [ info] [sp] stream processor started
[2023/10/05 16:28:06] [ info] [output:stdout:stdout.0] worker #0 started
[2023/10/05 16:28:07] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:07] [debug] Collect invoked
[2023/10/05 16:28:07] [debug] Sending request: method="GET" url="http://localhost:8888/test.json" headers=map[User-Agent:[Fluent-Bit HTTP Loader Plugin]] body=""
[2023/10/05 16:28:07] [debug] Received response: status_code=200 headers=map[Content-Length:[112] Content-Type:[application/json] Date:[Thu, 05 Oct 2023 19:28:07 GMT] Last-Modified:[Thu, 05 Oct 2023 19:26:49 GMT] Server:[SimpleHTTP/0.6 Python/3.11.5]] body="{\n\t\"records\": [\n\t\t{\"foo\": \"bar\"},\n\t\t{\"foo\": \"bat\"},\n\t\t{\"foo\": \"bad\"},\n\t\t{\"foo\": \"bath\"},\n\t\t{\"foo\": \"barf\"}\n\t]\n}\n"
[2023/10/05 16:28:07] [debug] Sending record: map[value:records]
[2023/10/05 16:28:07] [debug] Initiating store process with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:07] [debug] Successfully stored data in local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:07] [ warn] Error updating metadata in cloud API with key "http_loader_v2_da76aa41": cloud api: client not initialized or pipelineID missing
[2023/10/05 16:28:08] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=http_loader.0
[2023/10/05 16:28:09] [debug] [task] created task=0x7fff9001fc60 id=0 OK
[2023/10/05 16:28:09] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/10/05 16:28:09] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
{"date":1696534087.430507,"value":"records"}
[2023/10/05 16:28:09] [debug] [out flush] cb_destroy coro_id=0
[2023/10/05 16:28:09] [debug] [task] destroy task=0x7fff9001fc60 (task_id=0)
[2023/10/05 16:28:10] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:11] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:12] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:13] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:14] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:15] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:16] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:17] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:17] [debug] Sending request: method="GET" url="http://localhost:8888/test.json" headers=map[User-Agent:[Fluent-Bit HTTP Loader Plugin]] body=""
[2023/10/05 16:28:17] [debug] Received response: status_code=200 headers=map[Content-Length:[112] Content-Type:[application/json] Date:[Thu, 05 Oct 2023 19:28:17 GMT] Last-Modified:[Thu, 05 Oct 2023 19:26:49 GMT] Server:[SimpleHTTP/0.6 Python/3.11.5]] body="{\n\t\"records\": [\n\t\t{\"foo\": \"bar\"},\n\t\t{\"foo\": \"bat\"},\n\t\t{\"foo\": \"bad\"},\n\t\t{\"foo\": \"bath\"},\n\t\t{\"foo\": \"barf\"}\n\t]\n}\n"
[2023/10/05 16:28:17] [debug] Sending record: map[value:records]
[2023/10/05 16:28:17] [debug] Initiating store process with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:17] [debug] Successfully stored data in local disk storage with key "http_loader_v2_da76aa41"
[2023/10/05 16:28:17] [ warn] Error updating metadata in cloud API with key "http_loader_v2_da76aa41": cloud api: client not initialized or pipelineID missing
[2023/10/05 16:28:18] [debug] [input chunk] update output instances with new chunk size diff=26, records=1, input=http_loader.0
[2023/10/05 16:28:19] [debug] [task] created task=0x7fff9005fee0 id=0 OK
[2023/10/05 16:28:19] [debug] [output:stdout:stdout.0] task_id=0 assigned to thread #0
[2023/10/05 16:28:19] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
{"date":1696534097.481164,"value":"records"}
[2023/10/05 16:28:19] [debug] [out flush] cb_destroy coro_id=1
[2023/10/05 16:28:19] [debug] [task] destroy task=0x7fff9005fee0 (task_id=0)
[2023/10/05 16:28:20] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:21] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
[2023/10/05 16:28:22] [error] [/home/pwhelan/Projects/work/fluent-bit.git/2.1.9/src/flb_input_chunk.c:1696 errno=0] Success
^C[2023/10/05 16:28:23] [engine] caught signal (SIGINT)
[2023/10/05 16:28:23] [ warn] [engine] service will shutdown in max 5 seconds
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread pause instance
[2023/10/05 16:28:23] [ info] [engine] service has stopped (0 pending tasks)
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread pause instance
[2023/10/05 16:28:23] [ info] [output:stdout:stdout.0] thread worker #0 stopping...
[2023/10/05 16:28:23] [ info] [output:stdout:stdout.0] thread worker #0 stopped
[2023/10/05 16:28:23] [debug] [GO] running exit callback
2023/10/05 16:28:23 calling FLBPluginExit(): name="http_loader"
[2023/10/05 16:28:23] [debug] [input:http_loader:http_loader.0] thread exit instance

This is with vanilla fluent-bit v2.1.10 (possibly compiled from master).

It also works with core-fluent-bit.

@pwhelan
Copy link
Contributor Author

pwhelan commented Oct 5, 2023

No, getting nanosecond measurements :)

@nicolasparada are we still good here?

@niedbalski niedbalski requested a review from cosmo0920 October 6, 2023 05:38
Copy link
Contributor

@nicolasparada nicolasparada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Split an array and send them all now take nanoseconds and stop condition on the http_loader is working fine :)
Awesome work 👍

Copy link
Contributor

@cosmo0920 cosmo0920 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments which are not handled shortly. Basically, looks good to me.

output/decoder.go Show resolved Hide resolved
cshared.go Show resolved Hide resolved
@niedbalski niedbalski merged commit 4c8336c into main Oct 6, 2023
10 checks passed
@nicolasparada nicolasparada deleted the pwhelan-feat-buffered-channel-drain-single-collect branch June 6, 2024 14:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants